Home Exam 52002 - 2024-2025¶
Instructions¶
* Fill your ID Here: [replace the bracketed text with your ID number]
* Work on the assignment and submit your solution individually.
No sharing of information on the assignment is allowed between students.
* Format: Fill code, text explanations and output (figures, tables ..) in the designated places.
For some questions, the code you fill should run in this .ipynb notebook and generate the output automatically after running (e.g. in google colab).
For the Unix part you will need to run commands in other environments (Ubuntu) - in this case, just copy the commands and the relevant outputs in the designated text blocks.
Rename the solution file to 'HomeExam_52002_2045_25_[ID].ipynb' where [ID] should be replaced by your ID number.
* Submit your filled solution by Febuary 28th 23:59 your solution on moodle.
* Data: Some of the questions requires analyzing and manipulating data files. All the files required for the exam are located in the directory: '/sci/home/orzuk/FinalExamBigData' in Moriah. You may copy them to your working directory in Moriah, to your personal computer or any other computing environment you use. You may need to unzip the files before using them.
* Grading:
- There are overall $11$ questions in this home exam. Each question is worth $9$ points to your total grade. One additional point will be given for submitting files with correct formats and file names. * Note: Points from your grade may be deducted for submitting wrong/missing parts of files OR if not submitting the complete generated/complied output. * Note: Some parts of the code may take a long time to run. Be patient. However, don't leave everything to run at the last minute but prepare in advance so that your entire solution runs and finishes on time.
- Note: Solutions with naive or inefficient implementations may reduce the score
Submission Guidelines:¶
By the end of the exercise, please submit the following four files:
Networks, Streaming, Unix, and Batch Task Processing:
- Provide your solutions in both
.ipynb(Jupyter Notebook) and.htmlformats. Submit after running all parts of the.ipynbnotebook except the unix part , check that the outputs of each question were created and saved. For the unix part, copy the code and results manually to the.ipynbnotebook.
- Provide your solutions in both
Spark Section:
- Submit the fully executed Jupyter Notebook (
.ipynb) with all expected outputs, after running it in the Databricks environment. - Include an
.htmlexport of the executed notebook displaying the outputs.
- Submit the fully executed Jupyter Notebook (
Ensure that all submitted files are clearly labeled and display the required outputs where applicable.
- Good luck!
Part 1: Unix¶
Q1. Preprocessing using Unix¶
a.¶
The file network-review-Oregon.json contains user reviews of different buisnesses (it is a sample from a full review-Oregon.json file).
Use Unix commands to generate a new file called bipartite_network.txt' containing a table from the file network-review-Oregon.jsonThe table should contain only the next columns:user_id,gmap_id_from,rating` separated by commmas
Finally, show all the rows in which the user-id is 100000837087364476756
b.¶
Use Unix commands and the file from (a.) to generate a new
network-table.txt containing one row for each pair of buisnesses (gmap_id) that were reviewed by the same user (user_id).
The table should contain only the next columns:
gmap_id_from,user_id,rating_from,gmap_id_to,rating_to separated by commmas
You can split your process into multiple steps, creating intermediate CSV/TXT files and then merging them.
Finally, show all the rows in which the user-id is 100000837087364476756
Note: If two different users rate the same two buisnesses, there should be separate rows for the ratings. In addition, the pair of buisnesses should appear twice in the two possible orders. For example if user1 rated the two buisnesses as 3,4 and user2 rated them as 4,2, the following lines should be in your output file: gmap_id1, user1, 3, gmap_id2, 4 gmap_id2, user1, 4, gmap_id1, 3 gmap_id1, user2, 4, gmap_id2, 2 gmap_id2, user2, 2, gmap_id1, 4
SolQ1¶
Shell Commands ($):¶
a.¶
mkdir Data
touch Data/bipartite_network.txt
echo "user_id,gmap_id,rating" >> Data/bipartite_network.txt
zcat /sci/home/orzuk/BigDataMiningExam/network-review-Oregon.json.gz | jq -r '[.user_id, .gmap_id, .rating] | @csv' >> Data/bipartite_network.txt
grep '"100000837087364476756",' Data/bipartite_network.txt
b.¶
touch Data/network-table.txt
awk -F, '
NR == 1 { print "gmap_id_from,user_id,rating_from,gmap_id_to,rating_to"; next } # Print header and skip first line
{
key = $1 "_" $2 # Create a unique key: "user_id_gmap_id"
ratings[key] = $3
user_reviews[$1] = user_reviews[$1] "," $2 # Append business IDs for this user
}
END {
for (user in user_reviews) {
split(user_reviews[user], businesses, ",") # Convert list to array
n = length(businesses)
for (i = 2; i < n; i++) { # Start at index 2 to skip leading comma
for (j = i + 1; j <= n; j++) { # Ensure i ≠ j and avoid duplicates
key1 = user "_" businesses[i]
key2 = user "_" businesses[j]
# Print the pair in both orders
print businesses[i] "," user "," ratings[key1] "," businesses[j] "," ratings[key2]
print businesses[j] "," user "," ratings[key2] "," businesses[i] "," ratings[key1]
}
}
}
}' Data/bipartite_network.txt >> Data/network-table.txt
# Verify the user-specific entries
grep '"100000837087364476756",' Data/network-table.txt
sftp -J nathan.pasder@bava.cs.huji.ac.il nathan.pasder@ftp.rcs.huji.ac.il << EOF
lcd ~/Documents/HUJI/BigDataMining52002/Final/Data
lpwd
get Data/bipartite_network.txt
get Data/network-table.txt
bye
EOF
Shell Output ($):¶
a.¶
"100000837087364476756","0x54950a755cc35d8d:0x92d6d400144b2141",5
"100000837087364476756","0x54c17ff7bcaa7c31:0x2970a958143cc922",4
b.¶
"0x54950a755cc35d8d:0x92d6d400144b2141","100000837087364476756",5,"0x54c17ff7bcaa7c31:0x2970a958143cc922",4
"0x54c17ff7bcaa7c31:0x2970a958143cc922","100000837087364476756",4,"0x54950a755cc35d8d:0x92d6d400144b2141",5
Explanation & result analysis¶
a. Preprocessing:
We create a CSV file with headers, extract user_id, gmap_id, and rating from the JSON using jq, and filter the rows where our user_id is "100000837087364476756".
b. Network Table:
We then use awk to group our reviews by our user_id, generate all business pairs (in both orders) for each review, and finally filter the resulting table for our specific ID.
Data Results
- Part a: We see that our review data includes two businesses, with ratings of 5 and 4.
- Part b: We generate two rows representing the bidirectional pairing of our business reviews, ensuring that both orders of the business pair are included.
Q2. Batch Task Processing- Moriah¶
We want to calcaulte the total number of reviews and the average rating for each gmap_id of the full review file (review-Oregon.json)
Implement the following pipeline for this task:
- Split the input file into five files, one for each rating from 1 to 5 (e.g.,
rating_i.txt) using unix. - For each file, submit a job with python script that calculates how many times each
gmap_idappears. Save the results in a CSV file (e.g.rating_i_counts.txt.) - Run a final Python script to:
- Read all the CSV files.
- Combine the results.
- Calculate the average rating of
gmap_idand the total number of reviews. - Use the file
meta-Oregon.jsonto mapgmap_idto the buisness name
Print the top three gmap_id values sorted by rating, with ties broken by sorting by the number of reviews (both in descending order).
The output table should be with the next columns:
gmap_id,name,avg_rating,total_reviews.
Note: Steps 2-3 should be as pipline in single bash file
Hint: Use job dependencies because the tasks need to run in order. You can use moriah wiki to learn more.
Guidence: Every user has limited compute power. So first write your script on sample of the data and run on the local machine and only when you think that you code is good run on Moriah.
After completing and running the pipelinee, copy the unix script, python code and the results table in the next chunks.
Python files (First.py and Second.py):¶
First.py¶
import pandas as pd
import sys
def count_gmap_ids(input_file, output_file):
try:
# Load CSV file (ensuring proper column names)
df = pd.read_csv(input_file)
# Ensure 'gmap_id' column exists
if "gmap_id" not in df.columns:
print(f"❌ Error: 'gmap_id' column not found in {input_file}")
return
# Count occurrences of gmap_id
gmap_counts = df["gmap_id"].value_counts().reset_index()
gmap_counts.columns = ["gmap_id", "count"]
# Save results to a CSV-formatted TXT file
gmap_counts.to_csv(output_file, index=False)
print(f"✅ Completed processing: {input_file} -> {output_file}")
except Exception as e:
print(f"❌ Error processing {input_file}: {e}")
if __name__ == '__main__':
if len(sys.argv) < 3:
print("Usage: python3 count_gmap.py <input_file> <output_file>")
sys.exit(1)
input_file = sys.argv[1]
output_file = sys.argv[2]
count_gmap_ids(input_file, output_file)
Explanation 'First.py'¶
We implement a function to count occurrences of each gmap_id in a CSV file. We:
- Load Data: Use Pandas to read the CSV and ensure the
gmap_idcolumn exists. - Count Frequencies: Apply
value_counts()ongmap_id, then reset the index and rename columns togmap_idandcount. - Output: Write the results to a CSV-formatted TXT file.
This script accepts command-line arguments, providing robust error handling and status messages to verify processing.
Second.py¶
import pandas as pd
import numpy as np
import json
import sys
import os
def aggregate_reviews(data_dir, meta_file, output_file):
try:
# Load metadata to map gmap_id -> business name
gmap_to_name = {}
with open(meta_file, 'r', encoding='utf-8') as f:
for line in f:
try:
item = json.loads(line.strip()) # Read each line as a JSON object
gmap_to_name[item["gmap_id"]] = item["name"]
except json.JSONDecodeError:
print(f"⚠️ Skipping malformed JSON line: {line.strip()}")
# Initialize dataframe
all_data = pd.DataFrame(columns=["gmap_id", "count", "rating"])
# Process each rating_i_counts.txt file
for i in range(1, 6):
rating_file = os.path.join(data_dir, f"rating_{i}_counts.txt")
if os.path.exists(rating_file) and os.path.getsize(rating_file) > 0:
df = pd.read_csv(rating_file)
df["rating"] = i # Assign the rating value to each row
all_data = pd.concat([all_data, df], ignore_index=True)
# Aggregate: Calculate total reviews & average rating per gmap_id
grouped = all_data.groupby("gmap_id").agg(
total_reviews=("count", "sum"),
avg_rating=("rating", lambda x: np.average(x, weights=all_data.loc[x.index, 'count']))
).reset_index()
# Map business names
grouped["name"] = grouped["gmap_id"].map(gmap_to_name)
# Sort: First by avg_rating (desc), then by total_reviews (desc)
grouped = grouped.sort_values(by=["avg_rating", "total_reviews"], ascending=[False, False])
# Save results
grouped.to_csv(output_file, index=False)
print(f"✅ Final results saved to: {output_file}")
# Print top 3 businesses
print("\n🏆 Top 3 Businesses:")
print(grouped.head(3).to_string(index=False))
except Exception as e:
print(f"❌ Error in aggregation: {e}")
if __name__ == '__main__':
if len(sys.argv) < 4:
print("Usage: python3 Second.py <data_directory> <meta_file> <output_file>")
sys.exit(1)
data_dir = sys.argv[1]
meta_file = sys.argv[2]
output_file = sys.argv[3]
aggregate_reviews(data_dir, meta_file, output_file)
Explanation 'Second.py'¶
This script aggregates review counts and computes weighted average ratings:
- Metadata Mapping: Reads each JSON line from the metadata file to build a mapping from
gmap_idto business names. - Data Aggregation: Iterates through the five
rating_i_counts.txtfiles. For each file, we assign a static rating value (1–5) and merge the data into a single DataFrame. - Grouped Computations: Using
groupby("gmap_id"), we sum the review counts and calculate a weighted average rating (using the review counts as weights). - Sorting & Output: We map business names, then sort by average rating (descending) and by total reviews as a tiebreaker. The final aggregated table is written as a CSV file.
main_bash.sh¶
#!/bin/bash
#SBATCH --job-name=BigDataMiningP1Q2_bigdata_pipeline
#SBATCH --output=bigdata_pipeline.out
#SBATCH --error=bigdata_pipeline.err
#SBATCH --time=2:00:00
#SBATCH --mem=16G
#SBATCH --cpus-per-task=4
DATA_DIR="Data"
mkdir -p "$DATA_DIR"
OrZukData="/sci/home/orzuk/BigDataMiningExam"
REVIEW_FILE="$OrZukData/review-Oregon.json.gz"
META_FILE="$OrZukData/meta-Oregon.json.gz"
STEP_2_PYTHON_SCRIPT="First.py"
STEP_3_PYTHON_SCRIPT="Second.py"
FINAL_OUTPUT="$DATA_DIR/final_results.csv"
wait_for_jobs() {
local job_ids=($1) # Convert job IDs to an array
local step_name=$2
local start_time=$(date +%s)
echo -n "⏳ Waiting for $step_name to complete... (00:00:00)"
while true; do
all_completed=true # Assume all jobs are done unless proven otherwise
sleep 1 # Check every 1 second
for job_id in "${job_ids[@]}"; do
# Check if the job is still running
if squeue -j "$job_id" | grep -q "$job_id"; then
all_completed=false # At least one job is still running
break
fi
# If job is not in squeue, check its historical status in sacct
job_status=$(sacct -j "$job_id" --format=State --noheader | awk '{print $1}' | sort | uniq)
if [[ "$job_status" =~ "FAILED|CANCELLED|TIMEOUT" ]]; then
echo -e "\n❌ $step_name failed. Job ID: $job_id, Status: $job_status"
exit 1
elif [[ "$job_status" == "COMPLETED" ]]; then
continue # Job is done, check the next one
elif [[ -z "$job_status" ]]; then
echo -e "\n⚠️ Warning: Job ID $job_id not found in SLURM history. Assuming completed."
else
all_completed=false # Job status is unclear, so keep waiting
break
fi
done
# If all jobs are completed, exit the loop
if $all_completed; then
break
fi
# Update elapsed time dynamically
current_time=$(date +%s)
elapsed=$((current_time - start_time))
hours=$((elapsed / 3600))
minutes=$(((elapsed % 3600) / 60))
seconds=$((elapsed % 60))
printf "\r⏳ Waiting for $step_name to complete... (%02d:%02d:%02d)" $hours $minutes $seconds
done
# Final message
printf "\r✅ $step_name completed in (%02d:%02d:%02d)\n" $hours $minutes $seconds
}
echo "🚀 Submitting SLURM Jobs for Optimized Pipeline..."
# ===================== STEP 1: SPLIT REVIEWS (PARALLEL JOBS) =====================
echo "📂 Submitting Step 1: Splitting reviews into parallel jobs..."
STEP1_JOB_IDS=()
for i in {1..5}; do
STEP1_JOB=$(sbatch --parsable <<EOF
#!/bin/bash
#SBATCH --job-name=BigDataMiningP1Q2_split_rating_$i
#SBATCH --output=split_rating_$i.out
#SBATCH --mem=4G
#SBATCH --cpus-per-task=1
output_file="$DATA_DIR/rating_$i.txt"
echo "user_id,rating,gmap_id" > "\$output_file"
zcat "$REVIEW_FILE" | jq --argjson i "$i" -r 'select(.rating == $i) | [.user_id, .rating, .gmap_id] | @csv' >> "\$output_file"
echo "✅ Created \$output_file with \$(wc -l < "\$output_file") lines."
EOF
)
STEP1_JOB_IDS+=("$STEP1_JOB")
done
echo "📊 Step 1 Jobs Submitted: ${STEP1_JOB_IDS[*]}"
DEPENDENCY_STEP1=$(IFS=,; echo "${STEP1_JOB_IDS[*]}")
# ===================== WAIT FOR STEP 1 TO FINISH =====================
wait_for_jobs "$DEPENDENCY_STEP1" "Step 1 (Splitting Reviews)"
# ===================== STEP 2: COUNT gmap_id (PARALLEL JOBS) =====================
echo "📂 Submitting Step 2: Counting gmap_id occurrences..."
STEP2_JOB_IDS=()
for i in {1..5}; do
STEP2_JOB=$(sbatch --parsable --dependency=afterok:$DEPENDENCY_STEP1 <<EOF
#!/bin/bash
#SBATCH --job-name=BigDataMiningP1Q2_count_rating_$i
#SBATCH --output=count_rating_$i.out
#SBATCH --mem=4G
#SBATCH --cpus-per-task=1
input_file="$DATA_DIR/rating_$i.txt"
output_file="$DATA_DIR/rating_${i}_counts.txt"
if [ -s "\$input_file" ]; then
python3 "$STEP_2_PYTHON_SCRIPT" "\$input_file" "\$output_file"
if [ -s "\$output_file" ]; then
echo "✅ Created \$output_file with \$(wc -l < "\$output_file") lines."
else
echo "❌ Error: Output file \$output_file was not created."
fi
else
echo "⚠️ Skipping empty file: \$input_file"
fi
EOF
)
STEP2_JOB_IDS+=("$STEP2_JOB")
done
echo "📊 Step 2 Jobs Submitted: ${STEP2_JOB_IDS[*]}"
DEPENDENCY_STEP2=$(IFS=,; echo "${STEP2_JOB_IDS[*]}")
# ===================== WAIT FOR STEP 2 TO FINISH =====================
wait_for_jobs "$DEPENDENCY_STEP2" "Step 2 (Counting gmap_id)"
# ===================== STEP 3: AGGREGATE RESULTS =====================
echo "📂 Submitting Step 3: Aggregating final results..."
STEP3_JOB=$(sbatch --parsable --dependency=afterok:$DEPENDENCY_STEP2 <<EOF
#!/bin/bash
#SBATCH --job-name=BigDataMiningP1Q2_aggregate_results
#SBATCH --output=aggregate_results.out
#SBATCH --mem=8G
#SBATCH --cpus-per-task=2
zcat "$META_FILE" | python3 "$STEP_3_PYTHON_SCRIPT" "$DATA_DIR" "/dev/stdin" "$FINAL_OUTPUT"
if [ -s "$FINAL_OUTPUT" ]; then
echo "✅ Step 3 completed successfully. Final results saved in $FINAL_OUTPUT"
else
echo "❌ Step 3 failed. No output file generated."
exit 1
fi
EOF
)
echo "📊 Step 3 Jobs Submitted: $STEP3_JOB"
# ===================== WAIT FOR STEP 3 TO FINISH =====================
wait_for_jobs "$STEP3_JOB" "Step 3 (Aggregation)"
# ===================== PRINT TOP 3 BUSINESSES =====================
if [ -s "$FINAL_OUTPUT" ]; then
echo "✅ Aggregation complete. Displaying top 3 businesses from $FINAL_OUTPUT"
echo ""
echo "🏆 Top 3 Businesses:"
echo "--------------------------"
head -n 4 "$FINAL_OUTPUT" | tail -n 3 | column -t -s ","
echo "--------------------------"
else
echo "❌ Error: Aggregation failed or output file is empty."
fi
echo "✅ Optimized SLURM Pipeline Completed Successfully."
Explanation 'main_bash.sh'¶
This SLURM-based Bash pipeline orchestrates the batch processing tasks:
- Step 1 – Splitting: We split the compressed review file into five files (one per rating) using
zcatandjqwithin parallel SLURM jobs. - Job Synchronization: A custom
wait_for_jobsfunction polls SLURM (viasqueueandsacct) to ensure all splitting jobs complete before proceeding. - Step 2 – Counting: With job dependencies set (using
--dependency=afterok), we submit parallel jobs to run First.py on each split file. - Step 3 – Aggregation: Once counting jobs finish, we launch a final job that pipes the uncompressed metadata (using
zcat) into Second.py. The aggregated results are stored in a final CSV file. - Output Display: The script extracts and prints the top three businesses based on the sorted aggregated data.
- Robust Cluster Integration: The script uses here-documents (EOF) to embed job scripts within the main script, ensuring that all jobs are defined inline. It also employs environment variables for dynamic path configuration, making it easy to adapt to different cluster environments.
Cluster setup¶
Dear Bodek, on your local terminal, run sftp in order to upload/update files, and then re-enter the cluster to start submitting the job and track its live prints using the commands below(change my username):
# ===================== EXECUTE ON LOCAL TERMINAL TO UPLOAD FILES TO CLUSTER =====================
REMOTE_USER="nathan.pasder"
REMOTE_SFTP_HOST="ftp.rcs.huji.ac.il"
REMOTE_JUMP_HOST="bava.cs.huji.ac.il"
REMOTE_DIR="/sci/home/nathan.pasder"
LOCAL_DIR=~/Documents/HUJI/BigDataMining52002/Final/
OrZukData="/sci/home/orzuk/BigDataMiningExam"
echo "🚀 Uploading new scripts via SFTP..."
sftp -J "$REMOTE_USER@$REMOTE_JUMP_HOST" "$REMOTE_USER@$REMOTE_SFTP_HOST" << EOF
lcd "$LOCAL_DIR"
cd "$REMOTE_DIR"
# Ensure Data directory exists
mkdir Data 2>/dev/null # Won't fail if it already exists
# Upload new scripts (will overwrite existing files)
lcd "$LOCAL_DIR"
put main_bash.sh
put First.py
put Second.py
echo "✅ Upload complete."
bye
EOF
# ===================== EXECUTE JOB FROM LOCAL TERMINAL =====================
ssh -J "$REMOTE_USER@$REMOTE_JUMP_HOST" "$REMOTE_USER@moriah-gw.cs.huji.ac.il"
ls
sbatch main_bash.sh
squeue -u $USER
tail -f bigdata_pipeline.out
# ===================== EXECUTE ON LOCAL TERMINAL TO DOWNLOAD OUTPUT FILES FROM THE CLUSTER =====================
OrZukData="/sci/home/orzuk/BigDataMiningExam"
sftp -J "$REMOTE_USER@$REMOTE_JUMP_HOST" "$REMOTE_USER@$REMOTE_SFTP_HOST" << EOF
lcd "$LOCAL_DIR/Data"
cd "$REMOTE_DIR"
# Ensure Data directory exists
get Data/*
get "$OrZukData/review-Oregon.json.gz"
get "$OrZukData/network-review-Oregon.json.gz"
get "$OrZukData/meta-Oregon.json.gz"
bye
EOF
ls -lh
cd ~/Documents/HUJI/BigDataMining52002/Final/Data
gunzip *.json.gz
Explanation 'Cluster setup'¶
We manage file transfers and job submissions via a two-step process:
- Uploading: We use SFTP with a jump host to securely upload our updated scripts (i.e.,
main_bash.sh,First.py, andSecond.py) to the cluster. - Job Execution: After logging in via SSH through the jump host, we submit our pipeline with
sbatch, monitor job status usingsqueue, and review log outputs withtail -f. - Downloading: Once processing is complete, we retrieve the output files back to our local machine via SFTP, ensuring our local workspace is updated with the latest results.
Output¶
nathan.pasder@moriah-gw-02:~ $ ls
Data/ First.py main_bash.sh Second.py*
nathan.pasder@moriah-gw-02:~ $ sbatch main_bash.sh
Submitted batch job 29970759
nathan.pasder@moriah-gw-02:~ $ squeue -u $USER
JOBID PARTITION NAME USER ST TIME NODES NODELIST(REASON)
29970759 glacier BigDataM nathan.p R 0:03 1 glacier-08
29970764 glacier,p BigDataM nathan.p PD 0:00 1 (None)
29970763 glacier,p BigDataM nathan.p PD 0:00 1 (None)
29970762 glacier,p BigDataM nathan.p PD 0:00 1 (None)
29970761 glacier,p BigDataM nathan.p PD 0:00 1 (None)
29970760 glacier,p BigDataM nathan.p PD 0:00 1 (None)
nathan.pasder@moriah-gw-02:~ $ tail -f bigdata_pipeline.out
🚀 Submitting SLURM Jobs for Optimized Pipeline...
📂 Submitting Step 1: Splitting reviews into parallel jobs...
📊 Step 1 Jobs Submitted: 29970760 29970761 29970762 29970763 29970764
✅ Step 1 (Splitting Reviews) completed in (00:01:52)... (00:01:52)
📂 Submitting Step 2: Counting gmap_id occurrences...
📊 Step 2 Jobs Submitted: 29970765 29970766 29970767 29970768 29970769
✅ Step 2 (Counting gmap_id) completed in (00:00:02)... (00:00:02)
📂 Submitting Step 3: Aggregating final results...
📊 Step 3 Jobs Submitted: 29970770
✅ Step 3 (Aggregation) completed in (00:00:16)... (00:00:16)
✅ Aggregation complete. Displaying top 3 businesses from Data/final_results.csv
🏆 Top 3 Businesses:
--------------------------
0x87b21fedbf7f4f5b:0xa379fce9177a3dc6 372 5.0 Tip Top K9 Dog Training
0x5495a269266d6e8b:0x8c2044a8b62bc3cb 348 5.0 Peniche and Associates
0x89aec3802de3cd09:0x1bd2662c90a6a12c 268 5.0 "Pirate and Pixie Dust Destinations LLC"
--------------------------
✅ Optimized SLURM Pipeline Completed Successfully.
^C
Explanation & result analysis¶
Our final output is a CSV table containing the columns: gmap_id, name, avg_rating, and total_reviews. The aggregated results show that:
- Each
gmap_idis assigned a weighted average rating calculated from the individual rating counts. - The top three entries, sorted by descending average rating (and then total review count), all achieved an average rating of 5.0 with review counts of 372, 348, and 268, respectively.
This precise technical pipeline efficiently processes and aggregates the review data, mapping it to business names and ranking them according to both quality (average rating) and popularity (total reviews).
Part 2 : Streaming Algorithms¶
Q1. Streaming Sampling Algorithm¶
Write python function that reads the
review-Oregon.jsonfile line-by-line, i,e, one line at a time (see code template below). Your code should implement
online sampling of 1000 random users and all of their ratings. That is, you should initialize and update a data structue such that after each value of $n$ lines that were processed that correspond to $k \leq n$ distinct users, it should hold that your data structure stores the identity of $min(k, 1000)$ users chosen uniformly at random from the first $k$ users (without replacement), and will also store all lines corresponding to these usersAfter finishing to process all lines in the file, compute 'ave_rating' for all buisnesses using this sample of $1000$ users, and make a scatter plot of this
ave_ratingvs. theave_ratingfrom Unix Q2 using all users. Decsribe the results
Notes: You should never store the entire file in memory. After reading each line, if you decide to not include the corresponding user in your sample you should throw it away and never use it again. Exclude from the scatter plot buisnesses that were not reviewed by any of the 1000 users in your sample.
Q1_Sol¶
import json
import random
import sys
import matplotlib.pyplot as plt
import pandas as pd
# Constants
FILE_PATH = 'Data/review-Oregon.json'
CSV_PATH = 'Data/final_results.csv'
RESERVOIR_SIZE = 1000 # Target sample size
PROGRESS_INTERVAL = 1000 # Frequency of progress updates
def print_df_info(df, name):
"""Prints basic information about a DataFrame."""
print(f"\n📊 {name} DataFrame Info:")
df.head(5)
print(f"Rows: {df.shape[0]}, Columns: {df.shape[1]}")
print("-" * 50)
def stream_sampling(file_path, reservoir_size, progress_interval):
"""Performs reservoir sampling of 1000 unique users and stores their reviews in a DataFrame."""
data = [] # List to collect valid reviews
reservoir_users = set() # Track sampled users
distinct_users = set() # Track all distinct users
line_count = 0
total_lines = sum(1 for _ in open(file_path, 'r')) # Get total lines in file
print(f"📂 Total lines in file: {total_lines}")
with open(file_path, 'r') as f:
for line in f:
line_count += 1
try:
review = json.loads(line)
except json.JSONDecodeError:
continue # Skip malformed lines
user_id = review.get('user_id')
biz_id = review.get('gmap_id')
rating = review.get('rating')
if not user_id or not biz_id or rating is None:
continue # Skip invalid entries
distinct_users.add(user_id)
if user_id in reservoir_users:
data.append((user_id, biz_id, rating))
else:
if len(reservoir_users) < reservoir_size:
reservoir_users.add(user_id)
data.append((user_id, biz_id, rating))
else:
r = random.randint(0, len(distinct_users) - 1)
if r < reservoir_size:
removed_user = random.choice(list(reservoir_users))
reservoir_users.remove(removed_user)
reservoir_users.add(user_id)
data.append((user_id, biz_id, rating))
# Dynamic progress update
if line_count % progress_interval == 0:
distinct_users_pct = (len(distinct_users) / line_count) * 100 if line_count > 0 else 0
lines_pct = (line_count / total_lines) * 100
sampled_users_pct = (len(reservoir_users) / RESERVOIR_SIZE) * 100
sys.stdout.write(f"\r⌛ Processed {line_count}/{total_lines} lines ({lines_pct:.2f}%), "
f"Distinct Users: {len(distinct_users)}/{line_count} ({distinct_users_pct:.2f}%), "
f"Sampled Users: {len(reservoir_users)}/{RESERVOIR_SIZE} ({sampled_users_pct:.2f}%)")
sys.stdout.flush()
print("\n✅ Finished processing for sampling.")
print(f"Total distinct users encountered: {len(distinct_users)}")
print(f"Final sample size: {len(reservoir_users)}")
print(f"Total collected reviews: {len(data)}")
distinct_users_df = pd.DataFrame(list(distinct_users), columns=['user_id'])
reservoir_users_df = pd.DataFrame(list(reservoir_users), columns=['user_id'])
data_df = pd.DataFrame(data, columns=['user_id', 'gmap_id', 'rating'])
print_df_info(data_df, "Sampled Reviews")
return data_df
def compute_sample_avg_ratings(df):
"""Computes average ratings for businesses from the sampled users."""
avg_df = df.groupby('gmap_id')['rating'].mean().reset_index().rename(columns={'rating': 'sample_avg_rating'})
print_df_info(avg_df, "Sample Average Ratings")
return avg_df
def load_overall_business_avg(csv_path):
"""Loads overall business average ratings from CSV as a DataFrame."""
df = pd.read_csv(csv_path)[['gmap_id', 'avg_rating']]
print_df_info(df, "Overall Business Ratings")
return df
def plot_comparison(sample_df, overall_df):
"""Creates a scatter plot comparing sample vs overall business ratings."""
merged_df = sample_df.merge(overall_df, on='gmap_id', how='inner')
print_df_info(merged_df, "Merged Ratings for Comparison")
plt.figure(figsize=(8, 6))
plt.scatter(merged_df['sample_avg_rating'], merged_df['avg_rating'], alpha=0.5)
plt.xlabel('Sampled Average Rating')
plt.ylabel('Overall Average Rating')
plt.title('Sample vs Overall Business Ratings')
plt.grid(True)
plt.show()
def main():
"""Main function to execute streaming sampling and comparison plot using DataFrames."""
sample_df = stream_sampling(FILE_PATH, RESERVOIR_SIZE, PROGRESS_INTERVAL)
sample_avg_ratings = compute_sample_avg_ratings(sample_df)
overall_avg_ratings = load_overall_business_avg(CSV_PATH)
plot_comparison(sample_avg_ratings, overall_avg_ratings)
if __name__ == "__main__":
main()
📂 Total lines in file: 11012170 ⌛ Processed 11012000/11012170 lines (100.00%), Distinct Users: 2764026/11012000 (25.10%), Sampled Users: 1000/1000 (100.00%) ✅ Finished processing for sampling. Total distinct users encountered: 2764082 Final sample size: 1000 Total collected reviews: 55314 📊 Sampled Reviews DataFrame Info: Rows: 55314, Columns: 3 -------------------------------------------------- 📊 Sample Average Ratings DataFrame Info: Rows: 22939, Columns: 2 -------------------------------------------------- 📊 Overall Business Ratings DataFrame Info: Rows: 93006, Columns: 2 -------------------------------------------------- 📊 Merged Ratings for Comparison DataFrame Info: Rows: 22939, Columns: 3 --------------------------------------------------
Explanation & result analysis¶
This code uses incremental reservoir sampling to select a uniform random subset of exactly 1,000 distinct users from a massive stream of reviews without loading the entire dataset into memory. Each JSON line is parsed on the fly for user_id, gmap_id, and rating. If the user is already in the reservoir, we simply store their review. Otherwise, we either add the user if the reservoir is under capacity or apply a probabilistic replacement step to ensure all users have an equal chance of being sampled. This approach maintains a low memory footprint while capturing a representative cross-section of user ratings.
After streaming through $\sim11$ million lines and discovering $\sim2.76$ million unique users, we end up with 1,000 sampled users who collectively contributed about 55k reviews. We aggregate these reviews to compute sample-based average ratings for 22k businesses and then merge these with the overall average ratings from the full dataset. In the scatter plot, we observe a positive correlation between the sample-based and overall averages, indicating that despite the stochastic nature of reservoir sampling, the sampled distribution largely reflects the true rating landscape. Some variability arises for businesses with fewer sampled reviews, but overall, the method provides a robust approximation of the global rating patterns with minimal computational overhead.
Reservoir Coverage: From $\sim2.76$ million distinct users, the algorithm samples 1,000, capturing 55k reviews spanning 22k businesses.The scatter plot compares sampled average rating (x-axis) against the overall average rating (y-axis).
A large cluster appears between ratings of 4.0 and 5.0 on both axes, indicating that the sample-based ratings align well with the global averages for many businesses.
Some points fall below 3.0 on the overall average axis, reflecting businesses that are less favorably rated in the complete dataset, while the sample sometimes shows different averages—likely due to fewer sampled reviews.
Although the sample is relatively small, the high-level patterns mirror the overall dataset, demonstrating that reservoir sampling effectively captures the broader rating trends. Variability is expected for businesses with few sample reviews, but the overall density near higher ratings underscores that the sample aligns well with global user sentiments.
Part 3: Networks¶
! pip install python-louvain
! pip install folium
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import numpy as np
import community as community_louvain
from geopy.distance import geodesic
import folium
import random
from branca.colormap import LinearColormap
Q1. Exploratory Data Analysis¶
a.¶
Read the data file network-table.txt you created in the the previous Unix question.
In addition, read the file meta-Oregon.json, which contains additional information about each gmap_id, such as category, website, and more.
Display the first five rows of each dataset and explain what is shown and what does the data represents.
Note: If you failed to create the correct network-table.txt file in the unix part, you can use for this question the file we supply.
import pandas as pd
import json
NETWORK_TABLE_PATH = 'Data/network-table.txt'
# Read network-table.txt
network_df = pd.read_csv(NETWORK_TABLE_PATH)
print("📊 First five rows of network-table.txt:")
network_df.head()
📊 First five rows of network-table.txt:
| gmap_id_from | user_id | rating_from | gmap_id_to | rating_to | |
|---|---|---|---|---|---|
| 0 | 0x54c43d32ba265427:0x163b39283c8307c7 | 105765473316666918624 | 4 | 0x54c49cb3a08ecdab:0x424e43babf634dd5 | 4 |
| 1 | 0x54c49cb3a08ecdab:0x424e43babf634dd5 | 105765473316666918624 | 4 | 0x54c43d32ba265427:0x163b39283c8307c7 | 4 |
| 2 | 0x54c11e14b45a4339:0x558653a0c0fe6f2a | 109056019028012520715 | 4 | 0x54bfff9fb04ab4bd:0xdc40f423709d812f | 4 |
| 3 | 0x54bfff9fb04ab4bd:0xdc40f423709d812f | 109056019028012520715 | 4 | 0x54c11e14b45a4339:0x558653a0c0fe6f2a | 4 |
| 4 | 0x54c11e14b45a4339:0x558653a0c0fe6f2a | 109056019028012520715 | 4 | 0x54c11e14b45a4339:0x558653a0c0fe6f2a | 4 |
META_DATA_PATH = 'Data/meta-Oregon.json'
import pandas as pd
import json
# Read meta-Oregon.json (assuming each line is a JSON object)
meta_data = []
with open(META_DATA_PATH, 'r') as f:
for line in f:
try:
meta_data.append(json.loads(line))
except json.JSONDecodeError:
continue # Skip malformed lines
meta_df = pd.DataFrame(meta_data)
print("📊 First five rows of meta-Oregon.json:")
meta_df.head()
📊 First five rows of meta-Oregon.json:
| name | address | gmap_id | description | latitude | longitude | category | avg_rating | num_of_reviews | price | hours | MISC | state | relative_results | url | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | iPolish Nails Spa | None | 0x80dce9997c8d25fd:0xc6c81c1983060cbc | None | 45.597767 | -127.269699 | [Nail salon, Service establishment] | 5.0 | 1 | None | [[Thursday, 10AM–7PM], [Friday, 10AM–7PM], [Sa... | {'Payments': ['Debit cards', 'Credit cards']} | Open ⋅ Closes 7PM | [0x80dce9aec70a4ee9:0xc7cf24c90e4b537f, 0x80dc... | https://www.google.com/maps/place//data=!4m2!3... |
| 1 | Karens Country Confections, LLC | None | 0x89d0ba60af135b2f:0xd548538d7a3e2b8 | None | 45.598021 | -127.269639 | [Bakery, Service establishment] | 5.0 | 4 | None | [[Thursday, Closed], [Friday, Closed], [Saturd... | {'Service options': ['Delivery']} | Closed ⋅ Opens 9AM Sat | [0x89d0ac763567481b:0x90e7c3d44df486ce, 0x89d0... | https://www.google.com/maps/place//data=!4m2!3... |
| 2 | E & L General Contractors Inc | E & L General Contractors Inc, 14161 S Redland... | 0x549576c32c7ce82d:0x166266dbfadf6660 | None | 45.360527 | -122.575448 | [Concrete contractor] | 5.0 | 6 | None | None | None | None | [0x54957451d00ee61b:0xe247a7cb0184f2e, 0x54957... | https://www.google.com/maps/place//data=!4m2!3... |
| 3 | McDonald Orthodontics | McDonald Orthodontics, 1855 W Nob Hill St SE #... | 0x54bfff5952aad583:0xad7afdc825730614 | None | 44.922504 | -123.043930 | [Orthodontist] | 4.9 | 198 | None | [[Thursday, 8AM–4:30PM], [Friday, Closed], [Sa... | {'From the business': ['Identifies as veteran-... | Closed ⋅ Opens 8AM | [0x54bfff5c0ef7b38b:0xa14bec68116fbe4a, 0x54bf... | https://www.google.com/maps/place//data=!4m2!3... |
| 4 | Donatello's at Marion Forks | Donatello's at Marion Forks, 34970 OR-22, Idan... | 0x54bf0922053d25ed:0x73bbe9954ead56b2 | None | 44.615250 | -121.948546 | [Pizza Takeout] | 4.5 | 27 | None | [[Thursday, 11AM–7PM], [Friday, 11AM–7PM], [Sa... | {'Service options': ['Takeout', 'Delivery']} | Temporarily closed | None | https://www.google.com/maps/place//data=!4m2!3... |
b.¶
Read the file network-table.txt and plot a histogram showing the number of reviews by each user (user_id).
Next, plot a histogram showing the number of unique users reviewing each buisness (gmap_id).
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib.ticker import ScalarFormatter
# Load network table
# Count unique businesses reviewed per user
user_review_counts = network_df[['user_id', 'gmap_id_from']].drop_duplicates().groupby('user_id').size().reset_index()
user_review_counts.columns = ['user_id', 'review_count']
# Count unique users reviewing each business
gmap_review_counts = network_df[['gmap_id_from', 'user_id']].drop_duplicates().groupby('gmap_id_from').size().reset_index()
gmap_review_counts.columns = ['gmap_id', 'user_count']
# Plot side-by-side histograms
fig, axes = plt.subplots(1, 2, figsize=(16, 6), sharey=True, facecolor='white')
# Adjusting bins for better visibility of small values
bins_user = max(10, min(50, len(user_review_counts['review_count'].unique())))
bins_gmap = max(10, min(50, len(gmap_review_counts['user_count'].unique())))
# Histogram of unique reviews per user
axes[0].hist(user_review_counts['review_count'], bins=bins_user, edgecolor='black', alpha=0.7, color='royalblue')
axes[0].set_xlabel('Unique Businesses Reviewed per User', fontsize=12)
axes[0].set_ylabel('Number of Users', fontsize=12)
axes[0].set_title('Histogram of Unique Reviews per User', fontsize=14, fontweight='bold')
axes[0].grid(axis='y', linestyle='--', alpha=0.7)
axes[0].set_yscale('log') # Adjusting scale to better show small values
axes[1].yaxis.set_major_formatter(ScalarFormatter())
# Histogram of unique users per business
axes[1].hist(gmap_review_counts['user_count'], bins=bins_gmap, edgecolor='black', alpha=0.7, color='darkorange')
axes[1].set_xlabel('Unique Users per Business', fontsize=12)
axes[1].set_title('Histogram of Unique Users per Business', fontsize=14, fontweight='bold')
axes[1].grid(axis='y', linestyle='--', alpha=0.7)
axes[1].set_yscale('log') # Adjusting scale to better show small values
axes[1].yaxis.set_major_formatter(ScalarFormatter())
plt.tight_layout()
plt.show()
Explanation & result analysis¶
Reading the Data
- We load
network-table.txtinto a DataFrame (network_df) to get pairwise relationships between businesses (gmap_id_from,gmap_id_to) and the users (user_id) who reviewed them. - We parse
meta-Oregon.jsonline by line, converting each JSON record into a dictionary and collecting them inmeta_df. This metadata contains additional attributes such as name, address, and category for eachgmap_id.
Computing Counts
user_review_counts: We group byuser_idand count the unique businesses reviewed by each user, revealing how many distinct businesses each user has rated.gmap_review_counts: We group bygmap_id_fromand count the unique users who reviewed each business, indicating each business’s reviewer base.
Plotting Histograms
- We create two side-by-side histograms: one for
user_review_counts(the number of businesses per user) and one forgmap_review_counts(the number of users per business). - We apply a log-scale on the y-axis to better visualize the broad range of frequencies, label axes, and add grid lines for clarity.
User Reviews per Business (Left Plot):
Shows a right-skewed distribution where most users review only a few businesses, while a smaller group covers many.
Unique Users per Business (Right Plot):
Also right-skewed: many businesses attract only a few users, whereas a handful are much more popular.
Interpretation:
Both plots confirm the typical “long-tail” pattern in user-generated reviews, with heavy clustering at low counts and a smaller fraction of high-activity cases.
c.¶
- Display the distribution of buisness categories using the
meta-Oregonfile. For each buisness having multiple categories use only the firstcategory. Show only the top 30 categories having the largest number of buisnesses. Highlight all the restaurant cateories in a different color. - Choose 4 of the top 30 categories and show for each one of them the distribution of
avg_rating(from themeta-Oregonfile) for this cateroy.
import matplotlib.pyplot as plt
# Extract the first category for each business
meta_df['category'] = meta_df['category'].apply(lambda x: x[0] if isinstance(x, list) and len(x) > 0 else None)
# Count businesses per category
category_counts = meta_df['category'].value_counts().head(30).sort_values()
# Highlight only categories that contain 'restaurant' in any form
colors = ['darkorange' if 'restaurant' in rest.lower() else 'royalblue' for rest in category_counts.index]
plt.figure(figsize=(12, 7), facecolor='white')
category_counts.sort_values().plot(kind='barh', color=colors, edgecolor='black', alpha=0.7)
plt.xlabel('Number of Businesses', fontsize=12)
plt.ylabel('Category', fontsize=12)
plt.title('Top 30 Business Categories in Oregon', fontsize=14, fontweight='bold')
plt.grid(axis='x', linestyle='--', alpha=0.7)
plt.show()
# Select the highest 4 categories
top_ranked_labels = category_counts.index[-4:]
# Plot settings
plt.figure(figsize=(12, 8))
plot_index = 1
for label in top_ranked_labels:
rating_values = meta_df[meta_df['category'] == label]['avg_rating'].dropna() # Drop NaNs
# Create subplot
plt.subplot(2, 2, plot_index)
plot_index += 1
# Histogram
plt.hist(rating_values, bins=40, edgecolor='black', alpha=0.7)
# Mean line
plt.axvline(x=rating_values.mean(), linestyle="dashed", linewidth=2, label='Mean', color='red')
# Labels and title
plt.xlabel(f"{label} {meta_df['avg_rating'].name}")
plt.ylabel(f"{label} {meta_df['avg_rating'].name} frequency")
plt.title(f"Rating Distribution: {label}")
plt.legend()
plt.tight_layout()
plt.show()
Explanation & result analysis¶
Category Extraction & Counting
- We replace each business’s original category list with only its first entry, ensuring a single category per business.
- We then compute
value_counts()for these categories, select the top 30, and sort them.
Bar Chart
- We color each bar orange if its category name contains the string
'restaurant'(case-insensitive), otherwise blue. - The horizontal bar chart shows how many businesses fall into each of these top 30 categories.
Rating Distributions
- We pick the top four categories by business count.
- For each chosen category, we filter out
NaNratings, then create a histogram of itsavg_rating. - A dashed vertical line indicates the mean rating for that category.
Top Categories:
The bar chart reveals that Restaurant is the most frequent category, followed by Park, Coffee shop, Gas station, and so on.
Highlighting “Restaurant”:
All categories containing “restaurant” are shown in a different color to emphasize their prevalence.
Rating Histograms:
The four most frequent categories (e.g., Gas station, Coffee shop, Park, Restaurant) exhibit slightly different rating distributions. Most businesses cluster around high ratings (4.0–5.0), but each category shows its own spread, with mean ratings marked by the dashed lines.
d.¶
Finally, filter the the meta-Oregon file to include only buisnesses with more than 100 reviews. Use Folium to create a map showing the buisnesses with more than 100 reviews as circles, color them by the avg_rating between red to green and make their size proportional to the number of reviews (use radius = num_reviews / 1000).
Using all of these, describe the data and explain its meaning
import pandas as pd
import folium
# Filter the data to include only businesses with more than 100 reviews
filtered_df = meta_df[meta_df['num_of_reviews'] > 100]
# Remove duplicates based on 'gmap_id', keeping only the first occurrence
meta_df_unique = meta_df.drop_duplicates(subset='gmap_id', keep='first')
# Filter the data to include only businesses with more than 100 reviews
filtered_df = meta_df_unique[meta_df_unique['num_of_reviews'] > 100]
# Create the map centered around Oregon (or a suitable central location in the dataset)
m = folium.Map(location=[45.5236, -122.6750], zoom_start=7) # Centering map on Portland, Oregon
# Add circles to the map based on the filtered data
for _, row in filtered_df.iterrows():
# Calculate radius based on the number of reviews (num_reviews / 1000)
radius = row['num_of_reviews'] / 1000
# Color the circle by avg_rating between red (low) and green (high)
color = f"#{int((1 - row['avg_rating'] / 5) * 255):02x}{int((row['avg_rating'] / 5) * 255):02x}00" # Red to green
# Add a circle marker
folium.CircleMarker(
location=[row['latitude'], row['longitude']],
radius=radius,
color=color,
fill=True,
fill_color=color,
fill_opacity=0.6,
popup=f"<strong>{row['name']}</strong><br>Rating: {row['avg_rating']}<br>Reviews: {row['num_of_reviews']}",
).add_to(m)
# Save and display the map
m